[fix][broker] fix delayedMessagesCount error in InMemoryDelayedDeliveryTracker#25076
[fix][broker] fix delayedMessagesCount error in InMemoryDelayedDeliveryTracker#25076TakaHiR07 wants to merge 3 commits intoapache:masterfrom
Conversation
| .computeIfAbsent(timestamp, k -> new Long2ObjectRBTreeMap<>()) | ||
| .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap()); | ||
| if (!roaring64Bitmap.contains(entryId)) { | ||
| roaring64Bitmap.add(entryId); |
There was a problem hiding this comment.
It looks like .addLong should be used in the Roaring64Bitmap API.
| roaring64Bitmap.add(entryId); | |
| roaring64Bitmap.addLong(entryId); |
The .add method works too, but the method signature takes a long array (long...). Perhaps the compiler is able to optimize that, so it might not make a difference.
It's unfortunate that Roaring64Bitmap doesn't have the checkedAdd method as there is in RoaringBitmap. That would eliminate the need for the .contains check.
There was a problem hiding this comment.
Yes, unfortunately.
"add" or "addLong" I guess is the same after compile, sure it may be better to use "addLong" directly.
Besides, I think there is no need to consider concurrent situation in InMemoryDelayedDeliveryTracker. I don't see any code point out that concurrent situation would occur.
There was a problem hiding this comment.
Threading question: addMessage() and getScheduledMessages() are invoked under synchronized (this) in the dispatcher (e.g. PersistentDispatcherMultipleConsumers#trackDelayedDelivery), but clearDelayedMessages() doesn’t seem synchronized and InMemoryDelayedDeliveryTracker#clear() isn’t synchronized either.
Is clear() guaranteed to be called under the same lock, or should we align with BucketDelayedDeliveryTracker#clear() (synchronized) to avoid concurrent access to delayedMessageMap/bitmaps?
There was a problem hiding this comment.
@TakaHiR07 Please check the previous comment by @Denovo1998.
|
btw. this code location was discussed in the review: https://github.com/apache/pulsar/pull/24430/changes#r2156278377 |
| updateTimer(); | ||
|
|
||
| checkAndUpdateHighest(deliverAt); |
There was a problem hiding this comment.
One thought: should updateTimer() and checkAndUpdateHighest(deliverAt) run only when we actually insert a new entryId?
With the current structure, duplicate addMessage() calls still update highestDeliveryTimeTracked / messagesHaveFixedDelay, which could disable the fixed-delay optimization even though the tracker state didn’t change.”
There was a problem hiding this comment.
Great catch. I have checked the earliest fixed delay implementation in #16609. In the earliest implementation, the issue is already exist. When duplicate addMessage(), highestDeliveryTimeTracked is ok, but messagesHaveFixedDelay would be set to false incorrectly.
I would check why exist duplicate addMessage() later. And I prefer that we open another pr to fix the additional issue.
| log.error("[{}] Delayed message tracker getScheduledMessages should not < 0, number is: {}", | ||
| dispatcher.getName(), n); |
There was a problem hiding this comment.
About the new n < 0 branch: this should be unreachable in normal flow. One potential way to hit it is int overflow from int cardinality = (int) entryIds.getLongCardinality().
Would it be better to keep cardinality as long (and compare cardinality <= (long) n) to eliminate overflow, instead of only logging when n < 0?
There was a problem hiding this comment.
You are right. I think it is another issue and both use long value is better. Do you think we fix it in this pr or you push another pr to fix?
|
|
||
|
|
||
|
|
||
| // case2: addMessage() with duplicate entryId, |
There was a problem hiding this comment.
In case2 the comment says it enters cardinality > n, but with getScheduledMessages(10) and 4 unique entryIds it should hit the cardinality <= n branch. Could we adjust the comment to match the scenario (case3 seems to be the one exercising cardinality > n)?
There was a problem hiding this comment.
Yes. have changed the comment
@thetumbled Do you have a chance to review the current PR? |
Maybe we should figure out the reason why duplicate entry IDs are added multiple times if this class does not intentionally allow that behavior. |
There was a problem hiding this comment.
Pull request overview
This pull request fixes a bug in the InMemoryDelayedDeliveryTracker where duplicate message entries could cause incorrect delayed message counts, potentially leading to NPE issues. The fix adds a duplicate check before incrementing the counter and improves error handling for edge cases.
Key changes:
- Add duplicate entry check in
addMessage()usingRoaring64Bitmap.contains()before adding entries - Improve error handling in
getScheduledMessages()to explicitly handle and log then < 0case - Add comprehensive test coverage for duplicate entry scenarios across multiple test cases
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java | Implements duplicate entry check in addMessage() and enhances error handling in getScheduledMessages() |
| pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java | Adds comprehensive test method testDelayedMessagesCountWithDuplicateEntryId() covering three scenarios: multiple timestamps with duplicates, single timestamp with duplicates, and partial retrieval with duplicates |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
d12846f to
9b562dc
Compare
Motivation
Occur a NPE issue in delay message. And then find the reason is in delayedMessagesCount. When InMemoryDelayedDeliveryTracker#addMessage(), it don't judge whether the entryId is exist in roaringbitmap, that result in the delayedMessagesCount of the map size is not correct.
Modifications
Verifying this change
Documentation
docdoc-requireddoc-not-neededdoc-complete